Dubbo源码分析9 -Dubbo 服务引入

计划阅读调试下Dubbo的源码,结合官方源码分析Dubbo,自身再分析总结

本文对应的Dubbo 服务引入

源码分析

引入服务

接着上一篇,进入createProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
// 协议为temp的URL
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) {
// 如果url不为空则可能为直连模式
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {

isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}

if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 加载注册地址
List<URL> us = loadRegistries(false);
// 加载注册地址url
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 单个注册地址
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 多个注册地址
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) {
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}

Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建代理
return (T) proxyFactory.getProxy(invoker);
}

上面逻辑比较简单,主要是加载注册地址以及对多个注册地址或者直连地址的合并处理。核心部分在refprotocol.refer

invoker

refprotocol在当前环境下对应的是RegistryProtocol。

这段代码分析的时候涉及到多个动态加载的类。在进入分析前先罗列下,后续我再把堆栈打印出来直观展示

RegistryProtocol#refer -> ZookeeperRegistryFactory#getRegistry
-> ZookeeperRegistry -> CuratorZookeeperTransporter#connnect -> CuratorZookeeperClient

RegistryProtocol#refer -> MockClusterWrapper#join -> MockClusterInvoker#join -> FailoverCluster#join

下面继续调试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获取注册中心协议 此例子用的zookeeper
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 根据url中的protocol获取到registry,对应的是zookeeperRegistryFactory
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// group的处理
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 导出服务
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 设置directory

// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 根据最开始分析的,这里的registry会调到CuratorZookeeperClient,之后到注册中心注册
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 注册privider, configurators, routers目录
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
// 将信息存入配置缓存中
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}

doRefer方法中的 Invoker invoker = cluster.join(directory); 这里需要特殊说明下,这里是通过Wrapper类进到核心的逻辑类FailoverCluster,至于SPI如何调用到Wrapper的类,可以看下 Dubbo源码分析5-Dubbo 服务导出4

对应的堆栈如下图所示

1

2

进入cluster#join对应的failovercluster中

1
2
3
4
5
6
7
8
9
10
11
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 返回failoverclusterinvoke
return new FailoverClusterInvoker<T>(directory);
}

}
proxy

上一节分析了invoker的来源,得到invoker其实是MockClusterInvoker,下面接着分析另外一个核心点,proxyFactory.getProxy(invoker) 是如何创建proxy

proxyFactory是通过自适应SPI来实例的。在服务导出的时候,分析过他getInvoker实现

现在我们使用Arthas来看看,这个动态生成的类是怎么样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;

public class ProxyFactory$Adpative
implements ProxyFactory {
public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getInvoker(object, class_, uRL);
}

public Object getProxy(Invoker invoker) throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string = uRL.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getProxy(invoker);
}
}

可以看出来还是比较简单的,getProxy从invoker中拿到url,然后根据url中的proxy参数来获取到ProxyFactory。这里用的是默认的javassitProxyFactory

我们继续往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// 首先还是从基类分析
public abstract class AbstractProxyFactory implements ProxyFactory {

@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 入口
return getProxy(invoker, false);
}

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 这里从url中获取interfaces的参数,但这里没发现哪里存的这个参数,或许是为了扩展
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 分割字符
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 默认加上invoker的interface,以及echo
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 反射获取
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}

// 泛化接口的处理
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = GenericService.class;
}
// 调用子类方法
return getProxy(invoker, interfaces);
}
}

public class JavassistProxyFactory extends AbstractProxyFactory {

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 动态创建proxy
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

// 省去无关代码
}
public abstract class Proxy {
/**
* Get proxy.
*
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(Class<?>... ics) {

return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
* Get proxy.
*
* @param cl class loader.
* @param ics interface class array.
* @return Proxy instance.
*/
// 动态生成proxy
// 这个方法官网已经讲得非常详细了,照着官方来看
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
// 接口数量校验
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");

StringBuilder sb = new StringBuilder();
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");

Class<?> tmp = null;
try {
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}

if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

sb.append(itf).append(';');
}
// stringbuilder中存了所有的interface,并用分号分割
// use interface class name list as key.
String key = sb.toString();

// get cache by class loader.
// 根据classloader缓存 map
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}

Proxy proxy = null;
// 加锁
synchronized (cache) {
do {
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null)
return proxy;
}
// 仅允许单线程操作
if (value == PendingGenerationMarker) {
try {
cache.wait();
} catch (InterruptedException e) {
}
} else {
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);

Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();

for (int i = 0; i < ics.length; i++) {
// 如果接口不为空则需要在同一个包下
if (!Modifier.isPublic(ics[i].getModifiers())) {
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}

// ccp
ccp.addInterface(ics[i]);

for (Method method : ics[i].getMethods()) {
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc))
continue;
worked.add(desc);

int ix = methods.size();
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();

StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if (!Void.TYPE.equals(rt))
code.append(" return ").append(asArgument(rt, "ret")).append(";");

methods.add(method);
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}

if (pkg == null)
pkg = PACKAGE_NAME;

// create ProxyInstance class.
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));

// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null)
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
cache.put(key, new WeakReference<Proxy>(proxy));
cache.notifyAll();
}
}
return proxy;
}
}

上面的代码主要涉及ccp、ccm两部分,其中ccp用于生成interface的实现类,ccm用于生成proxy的基类

ccp生成的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0
implements ClassGenerator.DC,
EchoService,
DemoService {
public static Method[] methods;
private InvocationHandler handler;

public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}

public proxy0() {
}

public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], arrobject);
return (String)object;
}

public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}

ccm生成的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;

public class Proxy0
extends Proxy
implements ClassGenerator.DC {
public Object newInstance(InvocationHandler invocationHandler) {
return new proxy0(invocationHandler);
}
}

最终我们获取到了生成的Proxy0,大功告成